[[container-props]]
= Listener Container Properties

.`ContainerProperties` Properties
[cols="13,9,16", options="header"]
|===
| Property
| Default
| Description

|[[ackCount]]<<ackCount,`ackCount`>>
|1
|The number of records before committing pending offsets when the `ackMode` is `COUNT` or `COUNT_TIME`.

|[[adviceChain]]<<adviceChain,`adviceChain`>>
|`null`
|A chain of `Advice` objects (e.g. `MethodInterceptor` around advice) wrapping the message listener, invoked in order.

|[[ackMode]]<<ackMode,`ackMode`>>
|BATCH
|Controls how often offsets are committed - see xref:kafka/receiving-messages/message-listener-container.adoc#committing-offsets[Committing Offsets].

|[[ackTime]]<<ackTime,`ackTime`>>
|5000
|The time in milliseconds after which pending offsets are committed when the `ackMode` is `TIME` or `COUNT_TIME`.

|[[assignmentCommitOption]]<<assignmentCommitOption,`assignmentCommitOption`>>
|LATEST_ONLY _NO_TX
|Whether or not to commit the initial position on assignment; by default, the initial offset will only be committed if the `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` is `latest` and it won't run in a transaction even if there is a transaction manager present.
See the JavaDocs for `ContainerProperties.AssignmentCommitOption` for more information about the available options.

|[[asyncAcks]]<<asyncAcks,`asyncAcks`>>
|`false`
|Enable out-of-order commits (see xref:kafka/receiving-messages/ooo-commits.adoc[Manually Committing Offsets]); the consumer is paused and commits are deferred until gaps are filled.

|[[authExceptionRetryInterval]]<<authExceptionRetryInterval,`authExceptionRetryInterval`>>
|`null`
|When not null, a `Duration` to sleep between polls when an `AuthenticationException` or `AuthorizationException` is thrown by the Kafka client.
When null, such exceptions are considered fatal and the container will stop.

|[[batchRecoverAfterRollback]]<<batchRecoverAfterRollback,`batchRecoverAfterRollback`>>
|`false`
|Set to `true` to enable batch recovery, See xref:kafka/annotation-error-handling.adoc#after-rollback[After Rollback Processor].

|[[clientId]]<<clientId,`clientId`>>
|(empty string)
|A prefix for the `client.id` consumer property.
Overrides the consumer factory `client.id` property; in a concurrent container, `-n` is added as a suffix for each consumer instance.

|[[checkDeserExWhenKeyNull]]<<checkDeserExWhenKeyNull,`checkDeserExWhenKeyNull`>>
|false
|Set to `true` to always check for a `DeserializationException` header when a `null` `key` is received.
Useful when the consumer code cannot determine that an `ErrorHandlingDeserializer` has been configured, such as when using a delegating deserializer.

|[[checkDeserExWhenValueNull]]<<checkDeserExWhenValueNull,`checkDeserExWhenValueNull`>>
|false
|Set to `true` to always check for a `DeserializationException` header when a `null` `value` is received.
Useful when the consumer code cannot determine that an `ErrorHandlingDeserializer` has been configured, such as when using a delegating deserializer.

|[[commitCallback]]<<commitCallback,`commitCallback`>>
|`null`
|When present and `syncCommits` is `false` a callback invoked after the commit completes.

|[[commitLogLevel]]<<commitLogLevel,`commitLogLevel`>>
|DEBUG
|The logging level for logs pertaining to committing offsets.

|[[consumerRebalanceListener]]<<consumerRebalanceListener,`consumerRebalanceListener`>>
|`null`
|A rebalance listener; see xref:kafka/receiving-messages/rebalance-listeners.adoc[Rebalancing Listeners].

|[[commitRetries]]<<commitRetries,`commitRetries`>>
|3
|Set the number of retries `RetriableCommitFailedException` when using `syncCommits` set to true.
Default 3 (4-attempt total).

|[[consumerStartTimeout]]<<consumerStartTimeout,`consumerStartTimeout`>>
|30s
|The time to wait for the consumer to start before logging an error; this might happen if, say, you use a task executor with insufficient threads.

|[[deliveryAttemptHeader]]<<deliveryAttemptHeader,`deliveryAttemptHeader`>>
|`false`
|See xref:kafka/annotation-error-handling.adoc#delivery-header[Delivery Attempts Header].

|[[eosMode]]<<eosMode,`eosMode`>>
|`V2`
|Exactly Once Semantics mode; see xref:kafka/exactly-once.adoc[Exactly Once Semantics].

|[[fixTxOffsets]]<<fixTxOffsets,`fixTxOffsets`>>
|`false`
|When consuming records produced by a transactional producer, and the consumer is positioned at the end of a partition, the lag can incorrectly be reported as greater than zero, due to the pseudo record used to indicate transaction commit/rollback and, possibly, the presence of rolled-back records.
This does not functionally affect the consumer but some users have expressed concern that the "lag" is non-zero.
Set this property to `true` and the container will correct such mis-reported offsets.
The check is performed before the next poll to avoid adding significant complexity to the commit processing.
At the time of writing, the lag will only be corrected if the consumer is configured with `isolation.level=read_committed` and `max.poll.records` is greater than 1.
See https://issues.apache.org/jira/browse/KAFKA-10683[KAFKA-10683] for more information.

|[[groupId]]<<groupId,`groupId`>>
|`null`
|Overrides the consumer `group.id` property; automatically set by the `@KafkaListener` `id` or `groupId` property.

|[[idleBeforeDataMultiplier]]<<idleBeforeDataMultiplier,`idleBeforeDataMultiplier`>>
|5.0
|Multiplier for `idleEventInterval` that is applied before any records are received.
After a record is received, the multiplier is no longer applied.
Available since version 2.8.

|[[idleBetweenPolls]]<<idleBetweenPolls,`idleBetweenPolls`>>
|0
|Used to slow down deliveries by sleeping the thread between polls.
The time to process a batch of records plus this value must be less than the `max.poll.interval.ms` consumer property.

|[[idleEventInterval]]<<idleEventInterval,`idleEventInterval`>>
|`null`
|When set, enables publication of `ListenerContainerIdleEvent`+++s+++, see xref:kafka/events.adoc[Application Events] and xref:kafka/events.adoc#idle-containers[Detecting Idle and Non-Responsive Consumers].
Also see `idleBeforeDataMultiplier`.

|[[idlePartitionEventInterval]]<<idlePartitionEventInterval,`idlePartitionEventInterval`>>
|`null`
|When set, enables publication of `ListenerContainerIdlePartitionEvent`+++s+++, see xref:kafka/events.adoc[Application Events] and xref:kafka/events.adoc#idle-containers[Detecting Idle and Non-Responsive Consumers].

|[[kafkaConsumerProperties]]<<kafkaConsumerProperties,`kafkaConsumerProperties`>>
|None
|Used to override any arbitrary consumer properties configured on the consumer factory.

|[[kafkaAwareTransactionManager]]<<kafkaAwareTransactionManager,`kafkaAwareTransactionManager`>>
|`null`
|See xref:kafka/transactions.adoc[Transactions].

|[[listenerTaskExecutor]]<<listenerTaskExecutor,`listenerTaskExecutor`>>
|`SimpleAsyncTaskExecutor`
|A task executor to run the consumer threads.
The default executor creates threads named `<name>-C-n`; with the `KafkaMessageListenerContainer`, the name is the bean name; with the `ConcurrentMessageListenerContainer` the name is the bean name suffixed with `-n` where n is incremented for each child container.

|[[logContainerConfig]]<<logContainerConfig,`logContainerConfig`>>
|`false`
|Set to `true` to log at INFO level all container properties.

|[[messageListener]]<<messageListener,`messageListener`>>
|`null`
|The message listener.

|[[micrometerEnabled]]<<micrometerEnabled,`micrometerEnabled`>>
|`true`
|Whether or not to maintain Micrometer timers for the consumer threads.

|[[micrometerTags]]<<micrometerTags,`micrometerTags`>>
|empty
|A map of static tags to be added to micrometer metrics.

|[[micrometerTagsProvider]]<<micrometerTagsProvider,`micrometerTagsProvider`>>
|`null`
|A function that provides dynamic tags, based on the consumer record.

|[[missingTopicsFatal]]<<missingTopicsFatal,`missingTopicsFatal`>>
|`false`
|When true prevents the container from starting if the configured topic(s) are not present on the broker.

|[[monitorInterval]]<<monitorInterval,`monitorInterval`>>
|30s
|How often to check the state of the consumer threads for `NonResponsiveConsumerEvent` s.
See `noPollThreshold` and `pollTimeout`.

|[[noPollThreshold]]<<noPollThreshold,`noPollThreshold`>>
|3.0
|Multiplied by `pollTimeOut` to determine whether to publish a `NonResponsiveConsumerEvent`.
See `monitorInterval`.

|[[observationConvention]]<<observationConvention,`observationConvention`>>
|`null`
|When set, add dynamic tags to the timers and traces, based on information in the consumer records.

|[[observationEnabled]]<<observationEnabled,`observationEnabled`>>
|`false`
|Set to `true` to enable observation via Micrometer.

|[[offsetAndMetadataProvider]]<<offsetAndMetadataProvider,`offsetAndMetadataProvider`>>
|`null`
|A provider for `OffsetAndMetadata`; by default, the provider creates an offset and metadata with empty metadata. The provider gives a way to customize the metadata.

|[[onlyLogRecordMetadata]]<<onlyLogRecordMetadata,`onlyLogRecordMetadata`>>
|`false`
|Set to `false` to log the complete consumer record (in error, debug logs etc.) instead of just `topic-partition@offset`.

|[[pauseImmediate]]<<pauseImmediate,`pauseImmediate`>>
|`false`
|When the container is paused, stop processing after the current record instead of after processing all the records from the previous poll; the remaining records are retained in memory and will be passed to the listener when the container is resumed.

|[[pollTimeout]]<<pollTimeout,`pollTimeout`>>
|5000
|The timeout passed into `Consumer.poll()` in milliseconds.

|[[pollTimeoutWhilePaused]]<<pollTimeoutWhilePaused,`pollTimeoutWhilePaused`>>
|100
|The timeout passed into `Consumer.poll()` (in milliseconds) when the container is in a paused state.

|[[restartAfterAuthExceptions]]<<restartAfterAuthExceptions,`restartAfterAuthExceptions`>>
|false
|True to restart the container if it is stopped due to authorization/authentication exceptions.

|[[scheduler]]<<scheduler,`scheduler`>>
|`ThreadPoolTaskScheduler`
|A scheduler on which to run the consumer monitor task.

|[[shutdownTimeout]]<<shutdownTimeout,`shutdownTimeout`>>
|10000
|The maximum time in ms to block the `stop()` method until all consumers stop and before publishing the container stopped event.

|[[stopContainerWhenFenced]]<<stopContainerWhenFenced,`stopContainerWhenFenced`>>
|`false`
|Stop the listener container if a `ProducerFencedException` is thrown.
See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Processor] for more information.

|[[stopImmediate]]<<stopImmediate,`stopImmediate`>>
|`false`
|When the container is stopped, stop processing after the current record instead of after processing all the records from the previous poll.

|[[subBatchPerPartition]]<<subBatchPerPartition,`subBatchPerPartition`>>
|See desc.
|When using a batch listener, if this is `true`, the listener is called with the results of the poll split into sub batches, one per partition.
Default `false`.

|[[syncCommitTimeout]]<<syncCommitTimeout,`syncCommitTimeout`>>
|`null`
|The timeout to use when `syncCommits` is `true`.
When not set, the container will attempt to determine the `default.api.timeout.ms` consumer property and use that; otherwise it will use 60 seconds.

|[[syncCommits]]<<syncCommits,`syncCommits`>>
|`true`
|Whether to use sync or async commits for offsets; see `commitCallback`.

|[[topics]]<<topics,`topics` `topicPattern` `topicPartitions`>>
|n/a
|The configured topics, topic pattern or explicitly assigned topics/partitions.
Mutually exclusive; at least one must be provided; enforced by `ContainerProperties` constructors.

|[[transactionManager]]<<transactionManager,`transactionManager`>>
|`null`
|Deprecated since 3.2, see <<kafkaAwareTransactionManager>>, xref:kafka/transactions.adoc#transaction-synchronization[Other transaction managers].
|===

[[alc-props]]
.`AbstractListenerContainer` Properties
[cols="9,10,16", options="header"]
|===
| Property
| Default
| Description

|[[afterRollbackProcessor]]<<afterRollbackProcessor,`afterRollbackProcessor`>>
|`DefaultAfterRollbackProcessor`
|An `AfterRollbackProcessor` to invoke after a transaction is rolled back.

|[[applicationEventPublisher]]<<applicationEventPublisher,`applicationEventPublisher`>>
|application context
|The event publisher.

|[[batchErrorHandler]]<<batchErrorHandler,`batchErrorHandler`>>
|See desc.
|Deprecated - see `commonErrorHandler`.

|[[batchInterceptor]]<<batchInterceptor,`batchInterceptor`>>
|`null`
|Set a `BatchInterceptor` to call before invoking the batch listener; does not apply to record listeners.
Also see `interceptBeforeTx`.

|[[beanName]]<<beanName,`beanName`>>
|bean name
|The bean name of the container; suffixed with `-n` for child containers.

|[[commonErrorHandler]]<<commonErrorHandler,`commonErrorHandler`>>
|See desc.
|`DefaultErrorHandler` or `null` when a `transactionManager` is provided when a `DefaultAfterRollbackProcessor` is used.
See xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Handlers].

|[[containerProperties]]<<containerProperties,`containerProperties`>>
|`ContainerProperties`
|The container properties instance.

|[[groupId2]]<<groupId2,`groupId`>>
|See desc.
|The `containerProperties.groupId`, if present, otherwise the `group.id` property from the consumer factory.

|[[interceptBeforeTx]]<<interceptBeforeTx,`interceptBeforeTx`>>
|`true`
|Determines whether the `recordInterceptor` is called before or after a transaction starts.

|[[listenerId]]<<listenerId,`listenerId`>>
|See desc.
|The bean name for user-configured containers or the `id` attribute of `@KafkaListener`+++s+++.

|[[listenerInfo]]<<listenerInfo,`listenerInfo`>>
|null
|A value to populate in the `KafkaHeaders.LISTENER_INFO` header.
With `@KafkaListener`, this value is obtained from the `info` attribute.
This header can be used in various places, such as a `RecordInterceptor`, `RecordFilterStrategy` and in the listener code itself.

|[[pauseRequested]]<<pauseRequested,`pauseRequested`>>
|(read only)
|True if a consumer pause has been requested.

|[[recordInterceptor]]<<recordInterceptor,`recordInterceptor`>>
|`null`
|Set a `RecordInterceptor` to call before invoking the record listener; does not apply to batch listeners.
Also see `interceptBeforeTx`.

|[[topicCheckTimeout]]<<topicCheckTimeout,`topicCheckTimeout`>>
|30s
|When the `missingTopicsFatal` container property is `true`, how long to wait, in seconds, for the `describeTopics` operation to complete.
|===

.`KafkaMessageListenerContainer` Properties
[cols="8,3,16", options="header"]
|===
| Property
| Default
| Description

|[[assignedPartitions]]<<assignedPartitions,`assignedPartitions`>>
|(read only)
|The partitions currently assigned to this container (explicitly or not).

|[[assignedPartitionsByClientId]]<<assignedPartitionsByClientId,`assignedPartitionsByClientId`>>
|(read only)
|The partitions currently assigned to this container (explicitly or not).

|[[clientIdSuffix]]<<clientIdSuffix,`clientIdSuffix`>>
|`null`
|Used by the concurrent container to give each child container's consumer a unique `client.id`.

|[[containerPaused]]<<containerPaused,`containerPaused`>>
|n/a
|True if pause has been requested and the consumer has actually paused.
|===

.`ConcurrentMessageListenerContainer` Properties
[cols="8,3,16", options="header"]
|===
| Property
| Default
| Description

|[[alwaysClientIdSuffix]]<<alwaysClientIdSuffix,`alwaysClientIdSuffix`>>
|`true`
|Set to false to suppress adding a suffix to the `client.id` consumer property, when the `concurrency` is only 1.

|[[assignedPartitions2]]<<assignedPartitions2,`assignedPartitions`>>
|(read only)
|The aggregate of partitions currently assigned to this container's child `KafkaMessageListenerContainer`+++s+++ (explicitly or not).

|[[assignedPartitionsByClientId2]]<<assignedPartitionsByClientId2,`assignedPartitionsByClientId`>>
|(read only)
|The partitions currently assigned to this container's child `KafkaMessageListenerContainer`+++s+++ (explicitly or not), keyed by the child container's consumer's `client.id` property.

|[[concurrency]]<<concurrency,`concurrency`>>
|1
|The number of child `KafkaMessageListenerContainer`+++s+++ to manage.

|[[containerPaused2]]<<containerPaused2,`containerPaused`>>
|n/a
|True if pause has been requested and all child containers' consumer has actually paused.

|[[containers]]<<containers,`containers`>>
|n/a
|A reference to all child `KafkaMessageListenerContainer`+++s+++.
|===

